DynamoDB から S3 への定期的なエクスポートの仕組みを AWS Glue と Step Functions を使用して実装してみた
コンバンハ、千葉(幸)です。
DynamoDB テーブルの中身を S3 バケットにエクスポートしたい、という場合があるかと思います。S3 にエクスポートしたものに対して、例えば Athena を利用して解析をかけたい、といったケースです。
AWS Glue や AWS Step Functions を利用して、定期的にエクスポートを行う仕組みについて以下のブログで紹介されているので、試してみました。 CloudFormation テンプレートや スクリプトが用意されているので、一通り流すだけでできます。
全体像としては以下のイメージです。
執筆時点より一年以上前のブログを参考にしていることもあり、サービスアップデートによりバッドプラクティスになってしまっている箇所があります。社内でフィードバックを受けましたので、数カ所で補足をします。
- Data Pipeline, EMR, Glue の比較
- Step Functions の Glue Workflow への置き換え
- Glue ETL スクリプトの 各種バージョン
- Glue ETL スクリプトのDynamoDB 読み取り並列度
目次
- DynamoDB から S3 へのバックアップのパターン
- やってみた
- 1. DynamoDB テーブルの作成
- 2. 共通スタックの作成
- 3. テーブルエクスポートスタックの作成
- デプロイされたリソースの確認
- 4. DynamoDB から S3 へのエクスポートの実行
- 5. エクスポート後の内容の確認
- 終わりに
DynamoDB から S3 へのバックアップのパターン
やってみる前に、全体像のおさらいをしておきます。
DynamoDB の中身を S3 にバックアップ・エクスポートする手法について、以下にまとまっています。
DynamoDB 標準機能によるバックアップ
ユーザー側で作り込みをしなくとも、DynamoDB では標準のバックアップの仕組みが用意されています。
- オンデマンドバックアップ
- ポイントインタイムリカバリ(継続的なバックアップ)
これらの機能により取得されたバックアップは S3 に保存されますが、ユーザーが該当 S3 にアクセスすることはできません。
ユーザー側での操作による DynamoDB バックアップ(エクスポート)
ユーザーがアクセスできる S3 バケットへ DynamoDB テーブルをエクスポートする手段として、以下が紹介されています。
- Data Pipeline
- メリット:最も簡単。AWS リソースの使用をできる限り少なく抑えながら1回限りのバックアップを作成する際に適する
- デメリット:他の方法と比較してカスマイズ性が乏しい。直近でのサービスアップデートが行われていない
- Amazon EMR
- メリット:詳細な設定が可能
- デメリット:Amazon EMR に対する習熟が必要
- AWS Glue
- メリット:Athena など他のサービスでも利用できる自動的かつ継続的なバックアップを行う際のベストプラクティス
- デメリット:AWS Glue に対する知見が必要。 Data Pipeline より料金がかかる
今回やってみるのは、一番最後の AWS Glue を使用するパターンです。
Glue はサーバーレスであるため、インフラのメンテナンスが必要ない点、高速に起動する点(Glue 2.0の場合)にメリットがあります。Data Pipeline は直近でのサービスアップデートはほとんど行われていないため、これから採用を検討する際には Glue を選択いただくのがよいかと思います。
参考までに、 Data Pipeline の使用イメージについては、以下エントリをご参照ください。
やってみた
今回は以下のステップに分けて実行していきます。基本的には提供されている CloudFormation テンプレートを使用します。
- DynamoDB テーブルの作成
- 共通スタックの作成
- テーブルエクスポートスタックの作成
- DynamoDB から S3 へのエクスポートの実行
- エクスポート後の内容の確認
1. DynamoDB テーブルの作成
まずはエクスポート元となる DynamoDB を準備します。すでに適当な DynamoDB がある場合は、このステップは飛ばして問題ありません。
今回は、以下のエントリで紹介されている手順に則り作成します。
ブログで紹介されている、以下のテンプレートを用いて CloudFormation スタックをデプロイします。
(ブログのボタンを押下して CloudFormation の画面に遷移した場合、バージニア北部が選択されている状態のため、必要に応じて別のリージョンに切り替えます。以降の手順でも同様です。)
Description: An example DynamoDB table for storing Reviews on books Resources: ReviewsTable: Type: AWS::DynamoDB::Table Properties: TableName: Reviews KeySchema: - AttributeName: User KeyType: HASH - AttributeName: Book KeyType: RANGE AttributeDefinitions: - AttributeName: User AttributeType: S - AttributeName: Book AttributeType: S ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5
ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
ReviewsTable | AWS::DynamoDB::Table | Reviews |
デプロイされた結果を確認すると。このようになっています。
AWS CLI でも確認しておきます。
% aws dynamodb describe-table --table-name Reviews { "Table": { "AttributeDefinitions": [ { "AttributeName": "Book", "AttributeType": "S" }, { "AttributeName": "User", "AttributeType": "S" } ], "TableName": "Reviews", "KeySchema": [ { "AttributeName": "User", "KeyType": "HASH" }, { "AttributeName": "Book", "KeyType": "RANGE" } ], "TableStatus": "ACTIVE", "CreationDateTime": "2020-09-11T20:41:41.764000+09:00", "ProvisionedThroughput": { "NumberOfDecreasesToday": 0, "ReadCapacityUnits": 5, "WriteCapacityUnits": 5 }, "TableSizeBytes": 0, "ItemCount": 0, "TableArn": "arn:aws:dynamodb:ap-northeast-1:000000000000:table/Reviews", "TableId": "4db9731f-1934-4964-abcc-da9de06207c5" } }
テーブルにアイテムを追加する
現状テーブルは空のままなので、アイテムを追加します。
[ 項目 ]-> [項目の作成 ]を選択し……
[ Text ]を選択し、内容を入力して[保存]を押下します。
なお、ここで入力するのは以下の値です。
{ "User": "Tristan", "Book": "Harry Potter and the Philosopher's Stone", "Rating": 5, "Review": "A thrilling journey through the world of Hogwarts", "Author": "J.K.Rowling" }
これをもう一度行い、2 項目を登録した状態にします。
2 回目に登録した内容は以下です。
{ "User": "Adeline", "Book": "Harry Potter and the Sorcerer's Stone", "Rating": 4, "Review": "Harry is pretty brave, but Hermione is the clear hero", "Author": "J.K.Rowling" }
これで DynamoDB の準備ができました。
2. 共通スタックの作成
冒頭のブログの一つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
DynamoDBExportStateMachine | AWS::StepFunctions::StateMachine | DynamoDBExportAndAthenaLoad |
StateExecutionRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-<ランダム文字列> |
StartGlueCrawler | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-<ランダム文字列> |
GetGlueCrawlerStatus | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-<ランダム文字列> |
CreateCurrentView | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-<ランダム文字列> |
GetCurrentViewStatus | AWS::Lambda::Function | AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-<ランダム文字列> |
LambdaRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-<ランダム文字列> |
DynamoDBExportsBucket | AWS::S3::Bucket | dynamodb-exports-<アカウント番号>-<リージョン> |
EventTriggerRole | AWS::IAM::Role | AWSBigDataBlog-GlueDynamoExportTa-EventTriggerRole-<ランダム文字列> |
GlueCrawlerAndJobRole | AWS::IAM::Role | AWSGlueServiceRoleDefault |
念のためテンプレートも載せておきます。
折り畳み
Description: Template for setting up common infrastructure for exporting DynamoDB tables to S3 with Step Functions Resources: DynamoDBExportsBucket: Type: "AWS::S3::Bucket" Properties: BucketName: Fn::Sub: "dynamodb-exports-${AWS::AccountId}-${AWS::Region}" GlueCrawlerAndJobRole: Type: AWS::IAM::Role Properties: # Referenced the following documentation while creating this IAM Role # http://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html RoleName: AWSGlueServiceRoleDefault AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: glue.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole - arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess - arn:aws:iam::aws:policy/AmazonDynamoDBReadOnlyAccess LambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - lambda.amazonaws.com ManagedPolicyArns: - arn:aws:iam::aws:policy/AWSLambdaExecute - arn:aws:iam::aws:policy/AmazonAthenaFullAccess Policies: - PolicyName: GlueJobTrigger PolicyDocument: Version: "2012-10-17" Statement: - Action: - glue:StartCrawler - glue:GetCrawler - glue:GetCrawlerMetrics - athena:StartQueryExecution - s3:ListBucket Effect: Allow Resource: "*" EventTriggerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - events.amazonaws.com Policies: - PolicyName: GlueJobTrigger PolicyDocument: Version: "2012-10-17" Statement: - Action: - states:StartExecution Effect: Allow Resource: "*" StateExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - states.amazonaws.com Policies: - PolicyName: GlueJobTrigger PolicyDocument: Version: "2012-10-17" Statement: - Action: - "lambda:InvokeFunction" - "glue:StartJobRun" - "glue:GetJobRun" - "glue:GetJobRuns" - "glue:BatchStopJobRun" Effect: Allow Resource: "*" DynamoDBExportStateMachine: Type: "AWS::StepFunctions::StateMachine" Properties: StateMachineName: DynamoDBExportAndAthenaLoad RoleArn: Fn::GetAtt: - StateExecutionRole - Arn DefinitionString: Fn::Sub: - |- { "StartAt": "StartJobRun", "States": { "StartJobRun": { "Type": "Task", "ResultPath": "$.glueresult", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName.$": "$.glue_job_name", "Arguments": { "--table_name.$": "$.table_name", "--read_percentage.$": "$.read_percentage", "--output_prefix.$": "$.output_prefix", "--output_format.$": "$.output_format" } }, "Next": "Start Crawler", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Export Failed" } ] }, "Export Failed": { "Type": "Fail", "Cause": "One or more steps could not complete successfully", "Error": "ExportFailed" }, "Start Crawler": { "Type": "Task", "Resource": "${StartGlueCrawler}", "Next": "Wait for Crawler", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Wait for Crawler": { "Type": "Wait", "Seconds": 60, "Next": "Get Crawler Status" }, "Get Crawler Status": { "Type": "Task", "Resource": "${GetGlueCrawlerStatus}", "Next": "Crawler Finished?" }, "Crawler Finished?": { "Type": "Choice", "Choices": [ { "Variable": "$.glue_crawler_status", "StringEquals": "SUCCEEDED", "Next": "Create Current View" }, { "Variable": "$.glue_crawler_status", "StringEquals": "CANCELLED", "Next": "Export Failed" }, { "Variable": "$.glue_crawler_status", "StringEquals": "FAILED", "Next": "Export Failed" } ], "Default": "Wait for Crawler" }, "Create Current View": { "Type": "Task", "Resource": "${CreateCurrentView}", "Next": "Wait for Current View", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Wait for Current View": { "Type": "Wait", "Seconds": 5, "Next": "Get Current View Status" }, "Get Current View Status": { "Type": "Task", "Resource": "${GetCurrentViewStatus}", "Next": "Create Current View Finished?", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Create Current View Finished?": { "Type": "Choice", "Choices": [ { "Variable": "$.athena_query_status", "StringEquals": "SUCCEEDED", "Next": "Done" }, { "Variable": "$.athena_query_status", "StringEquals": "FAILED", "Next": "Export Failed" }, { "Variable": "$.athena_query_status", "StringEquals": "CANCELLED", "Next": "Export Failed" } ], "Default": "Wait for Current View" }, "Done": { "Type": "Pass", "End": true } } } - StartGlueCrawler: Fn::GetAtt: - StartGlueCrawler - Arn GetGlueCrawlerStatus: Fn::GetAtt: - GetGlueCrawlerStatus - Arn CreateCurrentView: Fn::GetAtt: - CreateCurrentView - Arn GetCurrentViewStatus: Fn::GetAtt: - GetCurrentViewStatus - Arn StartGlueCrawler: Type: AWS::Lambda::Function Properties: Handler: "index.crawler_trigger" Runtime: python3.6 Timeout: "60" Role: Fn::GetAtt: - LambdaRole - Arn Code: ZipFile: |- import logging import datetime import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) KEY_CRAWLER_NAME = 'crawler_name' GLUE_CLIENT = boto3.client('glue') def crawler_trigger(event, context): crawler_name = event[KEY_CRAWLER_NAME] GLUE_CLIENT.start_crawler( Name=crawler_name ) return event GetGlueCrawlerStatus: Type: AWS::Lambda::Function Properties: Handler: "index.lambda_handler" Runtime: python3.6 Role: Fn::GetAtt: - LambdaRole - Arn Code: ZipFile: |- import json import logging import datetime from dateutil.parser import parse import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) GLUE_CLIENT = boto3.client('glue') KEY_GLUE_CRAWLER_NAME = "crawler_name" KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status" KEY_STARTED_ON = "StartedOn" KEY_GLUE_RESULT = "glueresult" def lambda_handler(event, context): crawler_name = event[KEY_GLUE_CRAWLER_NAME] resp = GLUE_CLIENT.get_crawler(Name=crawler_name) print(resp) # In the case of an early exit make sure the key is present but the value # is a no op event[KEY_GLUE_CRAWLER_STATUS] = "" last_crawl = resp['Crawler'].get('LastCrawl') if last_crawl is None: return event last_crawl_start = last_crawl.get('StartTime') # Timestamp is stored as milliseconds since the epoch glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000) # If the last crawl is before the snapshot_timestamp then we're still waiting # for the current run we've initiated to finish if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp(): return event event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status'] return event CreateCurrentView: Type: AWS::Lambda::Function Properties: Handler: "index.create_current_view_trigger" Runtime: python3.6 Timeout: 30 Role: Fn::GetAtt: - LambdaRole - Arn Code: ZipFile: |- import logging import os import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) KEY_TABLE_NAME = "table_name" SNAPHSHOT_TIMESTAMP = "snapshot_timestamp" KEY_OUTPUT_PREFIX = "output_prefix" ATHENA_CLIENT = boto3.client('athena') S3_CLIENT = boto3.client('s3') def create_current_view_trigger(event, context): table_name = event[KEY_TABLE_NAME].lower().replace("-", "_") output_prefix = event[KEY_OUTPUT_PREFIX] snapshot_timestamp = get_snapshot_timestamp(output_prefix) query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';" LOGGER.info(query) # Athena always has permissions to write to the bucket of the following structure account_id = get_account_id(context) region = os.environ['AWS_REGION'] output = f"s3://aws-athena-query-results-{account_id}-{region}" # It's not uncommon for the start_query_execution response to return a successful response # and then for the query to fail asynchronously. Check the Athena Query History to double check. resp = ATHENA_CLIENT.start_query_execution( QueryString=query, ResultConfiguration={ 'OutputLocation': output } ) LOGGER.info(resp) event['athena_query_execution_id'] = resp['QueryExecutionId'] return event # AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda # arn def get_account_id(context): return context.invoked_function_arn.split(":")[4] def get_snapshot_timestamp(output_prefix): split = output_prefix.split("/") bucket = split[2] fmt = split[3] table = split[4] # The trailing slash is really important as it allows us to query S3 for the CommonPrefixes # which can be thought of as the next "folder" after a given prefix. prefix = f"{fmt}/{table}/" # Not sure if there can be so many prefixes that there's a continuation token. For now # assuming that there is not resp = S3_CLIENT.list_objects_v2( Bucket=bucket, Delimiter='/', Prefix=prefix ) # Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/' prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']] # There's no documentation on what order the CommonPrefixes come on so assume none, but we can # assume they're lexographically sortable give how the snapshot timestamp is stored prefixes.sort(reverse=True) # If the state machine has gotten this far there will always be at least one prefix return prefixes[0] GetCurrentViewStatus: Type: AWS::Lambda::Function Properties: Handler: "index.lambda_handler" Runtime: python3.6 Role: Fn::GetAtt: - LambdaRole - Arn Code: ZipFile: |- import logging import os import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) ATHENA_CLIENT = boto3.client('athena') def lambda_handler(event, context): query_id = event['athena_query_execution_id'] resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id) event['athena_query_status'] = resp['QueryExecution']['Status']['State'] return event Outputs: DynamoDBExportsBucket: Description: S3 bucket for DynamoDB exports Export: Name: DynamoDBExportsBucket Value: Ref: DynamoDBExportsBucket GlueCrawlerAndJobRole: Description: IAM Role for Glue Crawlers and Jobs Export: Name: GlueCrawlerAndJobRole Value: Ref: GlueCrawlerAndJobRole EventTriggerRole: Description: IAM Role for CloudWatch Events Export: Name: EventTriggerRole Value: Fn::GetAtt: - EventTriggerRole - Arn ExportStateMachineArn: Description: ARN for the Export State Machine Export: Name: ExportStateMachineArn Value: Ref: DynamoDBExportStateMachine
(追記ここから)
現在は、Glue ジョブやクローラーの依存関係や定期実行は Glue Workflow ですべて管理できるようになっています。
Glue Workflow に置き換えることで、本エントリの構成における Step Functions, Lambda関数、CloudWatch Events ルールはいずれも不要になり、シンプルな構成かつ料金も下がります。
Glue Workflow については以下をあわせてご参照ください。
(追記ここまで)
3. テーブルエクスポートスタックの作成
冒頭のブログの二つ目のテンプレートを用いて実装します。ここで作成されるリソースは以下です。
論理 ID | タイプ | リソース名 |
---|---|---|
ExportConverterGlueJob | AWS::Glue::Job | ReviewsExportTo<フォーマット名> |
GlueCrawler | AWS::Glue::Crawler | ReviewscsvCrawler |
Trigger | AWS::Events::Rule | AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-<ランダム文字列> |
なお、このテンプレートでは、デプロイ時に指定するパラメータが定義されています。
- MaxConsumedReadThroughput:読み取り容量の消費割合の上限
- OutputFormat:アウトプットのフォーマット
- TableName:エクスポート対象の DynamoDB テーブル名
今回は csv フォーマットでエクスポートしてみます。
こちらもテンプレートを載せておきます。
折り畳み
Description: Export a DynamoDB table to S3 with a Glue Job Parameters: TableName: Type: String Description: DynamoDB Table Name to export MaxConsumedReadThroughput: Type: Number Description: The maximum amount of Read Capacity Units the export is allowed to consumed expressed as a percentage MinValue: 0.001 MaxValue: 1.0 Default: 0.25 OutputFormat: Type: String Description: Output format of the export. One of avro, csv, json, orc, parquet, or xml. AllowedValues: - avro - csv - json - orc - parquet - xml Resources: Trigger: Type: "AWS::Events::Rule" Properties: Description: Fn::Sub: - "Start Export of ${Name} every night at midnight" - Name: Ref: TableName Targets: - Id: StepFunctions Arn: Fn::ImportValue: ExportStateMachineArn RoleArn: Fn::ImportValue: EventTriggerRole Input: Fn::Sub: - |- { "glue_job_name": "${GlueJobName}", "output_prefix": "${OutputPrefix}", "table_name": "${TableName}", "read_percentage": "${ReadPercentage}", "crawler_name": "${CrawlerName}", "output_format": "${OutputFormat}" } - GlueJobName: Fn::Sub: - "${Name}ExportTo${OutputFormat}" - Name: Ref: TableName OutputFormat: Ref: OutputFormat OutputPrefix: Fn::Sub: - "s3://${Bucket}/${OutputFormat}/${TableName}" - Bucket: Fn::ImportValue: DynamoDBExportsBucket TableName: Ref: TableName OutputFormat: Ref: OutputFormat OutputFormat: Ref: OutputFormat ReadPercentage: Ref: MaxConsumedReadThroughput CrawlerName: Fn::Sub: - "${Name}${OutputFormat}Crawler" - Name: Ref: TableName OutputFormat: Ref: OutputFormat ScheduleExpression: "cron(0 10 * * ? *)" State: DISABLED # Export table to S3 in the parquet format ExportConverterGlueJob: Type: "AWS::Glue::Job" Properties: Name: Fn::Sub: - "${Name}ExportTo${OutputFormat}" - Name: Ref: TableName OutputFormat: Ref: OutputFormat Role: Fn::ImportValue: GlueCrawlerAndJobRole MaxRetries: 3 Description: Fn::Sub: - Exports a DynamoDB table to ${OutputFormat} - OutputFormat: Ref: OutputFormat Command: # DO NOT CHANGE NAME. CloudFormation docs are wrong. Use Glue API docs: # http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html#aws-glue-api-jobs-job-JobCommand Name: "glueetl" ScriptLocation: "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py" AllocatedCapacity: 10 ExecutionProperty: MaxConcurrentRuns: 3 DefaultArguments: "--TempDir": Fn::Sub: - "s3://${Bucket}/glue-temp-dir" - Bucket: Fn::ImportValue: DynamoDBExportsBucket GlueCrawler: Type: AWS::Glue::Crawler Properties: Role: Fn::ImportValue: GlueCrawlerAndJobRole Name: Fn::Sub: - "${Name}${OutputFormat}Crawler" - Name: Ref: TableName OutputFormat: Ref: OutputFormat Description: Fn::Sub: - "Add new partitions and handle schema updates to the ${Name} table" - Name: Ref: TableName SchemaChangePolicy: UpdateBehavior: UPDATE_IN_DATABASE DeleteBehavior: DELETE_FROM_DATABASE DatabaseName: "dynamodb_exports" TablePrefix: "snapshots_" Targets: S3Targets: - Path: Fn::Sub: - "s3://${Bucket}/${OutputFormat}/${TableName}" - Bucket: Fn::ImportValue: DynamoDBExportsBucket TableName: Ref: TableName OutputFormat: Ref: OutputFormat
デプロイされたリソースの確認
次のステップに進む前に、2. と 3. のステップで作成された各種リソースを確認してみます。
Step Functions ステートマシン
このように作成されています。
AWS CLI での確認。
% aws stepfunctions describe-state-machine --state-machine-arn arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad { "stateMachineArn": "arn:aws:states:ap-northeast-1:000000000000:stateMachine:DynamoDBExportAndAthenaLoad", "name": "DynamoDBExportAndAthenaLoad", "status": "ACTIVE", "definition": "{\n 略 \n}", "roleArn": "arn:aws:iam::000000000000:role/AWSBigDataBlog-GlueDynamoExport-StateExecutionRole-CHS5BEKC9VBC", "type": "STANDARD", "creationDate": "2020-09-11T21:02:48.654000+09:00", "loggingConfiguration": { "level": "OFF", "includeExecutionData": false } }
定義の内訳はこのようになっています。
折り畳み
{ "StartAt": "StartJobRun", "States": { "StartJobRun": { "Type": "Task", "ResultPath": "$.glueresult", "Resource": "arn:aws:states:::glue:startJobRun.sync", "Parameters": { "JobName.$": "$.glue_job_name", "Arguments": { "--table_name.$": "$.table_name", "--read_percentage.$": "$.read_percentage", "--output_prefix.$": "$.output_prefix", "--output_format.$": "$.output_format" } }, "Next": "Start Crawler", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Export Failed" } ] }, "Export Failed": { "Type": "Fail", "Cause": "One or more steps could not complete successfully", "Error": "ExportFailed" }, "Start Crawler": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportTa-StartGlueCrawler-LHEP9BGFS2NZ", "Next": "Wait for Crawler", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Wait for Crawler": { "Type": "Wait", "Seconds": 60, "Next": "Get Crawler Status" }, "Get Crawler Status": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetGlueCrawlerStatus-1D6OO6MQMQ4YJ", "Next": "Crawler Finished?" }, "Crawler Finished?": { "Type": "Choice", "Choices": [ { "Variable": "$.glue_crawler_status", "StringEquals": "SUCCEEDED", "Next": "Create Current View" }, { "Variable": "$.glue_crawler_status", "StringEquals": "CANCELLED", "Next": "Export Failed" }, { "Variable": "$.glue_crawler_status", "StringEquals": "FAILED", "Next": "Export Failed" } ], "Default": "Wait for Crawler" }, "Create Current View": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1", "Next": "Wait for Current View", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Wait for Current View": { "Type": "Wait", "Seconds": 5, "Next": "Get Current View Status" }, "Get Current View Status": { "Type": "Task", "Resource": "arn:aws:lambda:ap-northeast-1:000000000000:function:AWSBigDataBlog-GlueDynamoExpo-GetCurrentViewStatus-SOWZAONHY5MJ", "Next": "Create Current View Finished?", "Retry": [ { "ErrorEquals": ["Lambda.ServiceException"], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 1.5 } ] }, "Create Current View Finished?": { "Type": "Choice", "Choices": [ { "Variable": "$.athena_query_status", "StringEquals": "SUCCEEDED", "Next": "Done" }, { "Variable": "$.athena_query_status", "StringEquals": "FAILED", "Next": "Export Failed" }, { "Variable": "$.athena_query_status", "StringEquals": "CANCELLED", "Next": "Export Failed" } ], "Default": "Wait for Current View" }, "Done": { "Type": "Pass", "End": true } } }
ステートマシンでは、各ステップで Lambda 関数が呼び出されています。
それぞれのコードを確認しておきます。
Lambda 関数①(Start Crawler)
import logging import datetime import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) KEY_CRAWLER_NAME = 'crawler_name' GLUE_CLIENT = boto3.client('glue') def crawler_trigger(event, context): crawler_name = event[KEY_CRAWLER_NAME] GLUE_CLIENT.start_crawler( Name=crawler_name ) return event
Lambda 関数②(Get Crawler Status)
import json import logging import datetime from dateutil.parser import parse import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) GLUE_CLIENT = boto3.client('glue') KEY_GLUE_CRAWLER_NAME = "crawler_name" KEY_GLUE_CRAWLER_STATUS = "glue_crawler_status" KEY_STARTED_ON = "StartedOn" KEY_GLUE_RESULT = "glueresult" def lambda_handler(event, context): crawler_name = event[KEY_GLUE_CRAWLER_NAME] resp = GLUE_CLIENT.get_crawler(Name=crawler_name) print(resp) # In the case of an early exit make sure the key is present but the value # is a no op event[KEY_GLUE_CRAWLER_STATUS] = "" last_crawl = resp['Crawler'].get('LastCrawl') if last_crawl is None: return event last_crawl_start = last_crawl.get('StartTime') # Timestamp is stored as milliseconds since the epoch glue_job_start = datetime.datetime.fromtimestamp(event[KEY_GLUE_RESULT][KEY_STARTED_ON] / 1000) # If the last crawl is before the snapshot_timestamp then we're still waiting # for the current run we've initiated to finish if last_crawl_start is None or last_crawl_start.timestamp() < glue_job_start.timestamp(): return event event[KEY_GLUE_CRAWLER_STATUS] = last_crawl['Status'] return event
Lambda 関数③(Create Current View)
import logging import os import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) KEY_TABLE_NAME = "table_name" SNAPHSHOT_TIMESTAMP = "snapshot_timestamp" KEY_OUTPUT_PREFIX = "output_prefix" ATHENA_CLIENT = boto3.client('athena') S3_CLIENT = boto3.client('s3') def create_current_view_trigger(event, context): table_name = event[KEY_TABLE_NAME].lower().replace("-", "_") output_prefix = event[KEY_OUTPUT_PREFIX] snapshot_timestamp = get_snapshot_timestamp(output_prefix) query = f"CREATE OR REPLACE VIEW \"dynamodb_exports\".\"{table_name}\" AS SELECT * FROM \"dynamodb_exports\".\"snapshots_{table_name}\" WHERE {SNAPHSHOT_TIMESTAMP} = '{snapshot_timestamp}';" LOGGER.info(query) # Athena always has permissions to write to the bucket of the following structure account_id = get_account_id(context) region = os.environ['AWS_REGION'] output = f"s3://aws-athena-query-results-{account_id}-{region}" # It's not uncommon for the start_query_execution response to return a successful response # and then for the query to fail asynchronously. Check the Athena Query History to double check. resp = ATHENA_CLIENT.start_query_execution( QueryString=query, ResultConfiguration={ 'OutputLocation': output } ) LOGGER.info(resp) event['athena_query_execution_id'] = resp['QueryExecutionId'] return event # AWS Account ID is not available in the environment, but you can get it breaking a part the Lambda # arn def get_account_id(context): return context.invoked_function_arn.split(":")[4] def get_snapshot_timestamp(output_prefix): split = output_prefix.split("/") bucket = split[2] fmt = split[3] table = split[4] # The trailing slash is really important as it allows us to query S3 for the CommonPrefixes # which can be thought of as the next "folder" after a given prefix. prefix = f"{fmt}/{table}/" # Not sure if there can be so many prefixes that there's a continuation token. For now # assuming that there is not resp = S3_CLIENT.list_objects_v2( Bucket=bucket, Delimiter='/', Prefix=prefix ) # Each prefix will look like 'parquet/Reviews/snapshot_timestamp=2019-01-11T06:15/' prefixes = [x['Prefix'].split('/')[2].split('=')[1] for x in resp['CommonPrefixes']] # There's no documentation on what order the CommonPrefixes come on so assume none, but we can # assume they're lexographically sortable give how the snapshot timestamp is stored prefixes.sort(reverse=True) # If the state machine has gotten this far there will always be at least one prefix return prefixes[0]
Lambda 関数④(Get Current View Status)
import logging import os import boto3 LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) ATHENA_CLIENT = boto3.client('athena') def lambda_handler(event, context): query_id = event['athena_query_execution_id'] resp = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_id) event['athena_query_status'] = resp['QueryExecution']['Status']['State'] return event
Glue クローラ
% aws glue get-crawler --name ReviewscsvCrawler { "Crawler": { "Name": "ReviewscsvCrawler", "Role": "AWSGlueServiceRoleDefault", "Targets": { "S3Targets": [ { "Path": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews", "Exclusions": [] } ], "JdbcTargets": [], "DynamoDBTargets": [], "CatalogTargets": [] }, "DatabaseName": "dynamodb_exports", "Description": "Add new partitions and handle schema updates to the Reviews table", "Classifiers": [], "SchemaChangePolicy": { "UpdateBehavior": "UPDATE_IN_DATABASE", "DeleteBehavior": "DELETE_FROM_DATABASE" }, "State": "READY", "TablePrefix": "snapshots_", "CrawlElapsedTime": 0, "CreationTime": "2020-09-11T21:15:27+09:00", "LastUpdated": "2020-09-11T21:15:27+09:00", "Version": 1 } }
Glue ジョブ
外部の S3 にあるスクリプトを呼び出しています。
% aws glue get-job --job-name ReviewsExportTocsv { "Job": { "Name": "ReviewsExportTocsv", "Description": "Exports a DynamoDB table to csv", "Role": "AWSGlueServiceRoleDefault", "CreatedOn": "2020-09-11T21:15:26.598000+09:00", "LastModifiedOn": "2020-09-11T21:15:26.598000+09:00", "ExecutionProperty": { "MaxConcurrentRuns": 3 }, "Command": { "Name": "glueetl", "ScriptLocation": "s3://aws-bigdata-blog/artifacts/using-glue-to-access-dynamodb-tables/export-dynamodb-table.py", "PythonVersion": "2" }, "DefaultArguments": { "--TempDir": "s3://dynamodb-exports-000000000000-ap-northeast-1/glue-temp-dir" }, "MaxRetries": 3, "AllocatedCapacity": 10, "Timeout": 2880, "MaxCapacity": 10.0 } }
(追記ここから)
ここでは、以下のバージョン・設定となっています。
- PythonVersion:2
- GlueVersion:指定なし(0.9)
- WorkerType:指定なし(Standard)
- NumberOfWorkers:指定なし(代わりに MaxCapacity=10)
2020年11月時点としては、以下の構成に置き換えることをおすすめします。特に、GlueVersion 2.0 に変更することを強く推奨します。
- PythonVersion:3
- GlueVersion:2.0
- WorkerType:G.1X もしくは G.2X
- NumberOfWorkers:10
(追記ここまで)
このスクリプトの内訳は、冒頭のブログに記載のある以下が該当します。
import sys import datetime from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext ARG_TABLE_NAME = "table_name" ARG_READ_PERCENT = "read_percentage" ARG_OUTPUT = "output_prefix" ARG_FORMAT = "output_format" PARTITION = "snapshot_timestamp" args = getResolvedOptions(sys.argv, [ 'JOB_NAME', ARG_TABLE_NAME, ARG_READ_PERCENT, ARG_OUTPUT, ARG_FORMAT ] ) table_name = args[ARG_TABLE_NAME] read = args[ARG_READ_PERCENT] output_prefix = args[ARG_OUTPUT] fmt = args[ARG_FORMAT] print("Table name:", table_name) print("Read percentage:", read) print("Output prefix:", output_prefix) print("Format:", fmt) date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M') output = "%s/%s=%s" % (output_prefix, PARTITION, date_str) sc = SparkContext() glueContext = GlueContext(sc) table = glueContext.create_dynamic_frame.from_options( "dynamodb", connection_options={ "dynamodb.input.tableName": table_name, "dynamodb.throughput.read.percent": read } ) glueContext.write_dynamic_frame.from_options( frame=table, connection_type="s3", connection_options={ "path": output }, format=fmt, transformation_ctx="datasink" )
(追記ここから)
ここではdynamodb.splits
というパラメータが有効化されていないため、読み取りが並列化されていません。今回の構成では Glue クラスターのリソースとして DPU = 10 が割り当てられていますが、そのほとんどが活用されないということになります。パラメータを明示的に指定することをお勧めします。
(追記ここまで)
Events ルール
今回は Amazon EventBridge の画面から確認しましたが、CloudWatch の画面からも確認できます。
CloudFormation からデプロイした際には定期実行が無効に設定されています。この仕組みを用いて定期実行させたい場合には、有効化する必要があります。
また、後続の手順で手動実行を行います。その際には、ステートマシンに引き渡すインプットをここからコピーしたものを用います。
AWS CLI での確認結果です。
% aws events describe-rule --name AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY { "Name": "AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY", "Arn": "arn:aws:events:ap-northeast-1:000000000000:rule/AWSBigDataBlog-GlueDynamoExportTableExport-Trigger-1CX0PNI843MPY", "ScheduleExpression": "cron(0 10 * * ? *)", "State": "DISABLED", "Description": "Start Export of Reviews every night at midnight", "EventBusName": "default" }
4. DynamoDB から S3 へのエクスポートの実行
リソースの作成が完了したため、実際に実行してみます。
ステートマシンの画面より、[ 実行の開始 ] を押下します。
先ほどコピーしたインプットを貼り付け、実行します。
実行の状況は以下のようにグラフィカルに確認できます。正常に終了するのを待ちます。
5. エクスポート後の内容の確認
ステートマシンの実行が正常に終了すると、 Glue テーブルが 2つ作成されています。
一つ目のテーブル。
AWS CLI でも見てみます。
% aws glue get-table --database-name dynamodb_exports --name reviews { "Table": { "Name": "reviews", "DatabaseName": "dynamodb_exports", "CreateTime": "2020-09-11T22:44:08+09:00", "UpdateTime": "2020-09-11T22:44:08+09:00", "Retention": 0, "StorageDescriptor": { "Columns": [ { "Name": "rating", "Type": "bigint" }, { "Name": "review", "Type": "string" }, { "Name": "book", "Type": "string" }, { "Name": "author", "Type": "string" }, { "Name": "user", "Type": "string" }, { "Name": "snapshot_timestamp", "Type": "string" } ], "Location": "", "Compressed": false, "NumberOfBuckets": 0, "SerdeInfo": {}, "SortColumns": [], "StoredAsSubDirectories": false }, "PartitionKeys": [], "ViewOriginalText": "/* Presto View: eyJvcmlnaW5hbFNxbCI6IlNFTEVDVCAqXG5GUk9NXG4gIGR5bmFtb2RiX2V4cG9ydHMuc25hcHNob3RzX3Jldmlld3NcbldIRVJFIChcInNuYXBzaG90X3RpbWVzdGFtcFwiID0gJzIwMjAtMDktMTFUMTM6NDEnKVxuIiwiY2F0YWxvZyI6ImF3c2RhdGFjYXRhbG9nIiwic2NoZW1hIjoiZGVmYXVsdCIsImNvbHVtbnMiOlt7Im5hbWUiOiJyYXRpbmciLCJ0eXBlIjoiYmlnaW50In0seyJuYW1lIjoicmV2aWV3IiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJib29rIiwidHlwZSI6InZhcmNoYXIifSx7Im5hbWUiOiJhdXRob3IiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InVzZXIiLCJ0eXBlIjoidmFyY2hhciJ9LHsibmFtZSI6InNuYXBzaG90X3RpbWVzdGFtcCIsInR5cGUiOiJ2YXJjaGFyIn1dfQ== */", "ViewExpandedText": "/* Presto View */", "TableType": "VIRTUAL_VIEW", "Parameters": { "comment": "Presto View", "presto_view": "true" }, "CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSBigDataBlog-GlueDynamoExportTableCom-LambdaRole-YDE0JG71FHAD/AWSBigDataBlog-GlueDynamoExportT-CreateCurrentView-K1KBFSLTS6G1", "IsRegisteredWithLakeFormation": false, "CatalogId": "000000000000" } }
2つ目のテーブル。
% aws glue get-table --database-name dynamodb_exports --name snapshots_reviews { "Table": { "Name": "snapshots_reviews", "DatabaseName": "dynamodb_exports", "Owner": "owner", "CreateTime": "2020-09-11T22:42:51+09:00", "UpdateTime": "2020-09-11T22:42:51+09:00", "LastAccessTime": "2020-09-11T22:42:51+09:00", "Retention": 0, "StorageDescriptor": { "Columns": [ { "Name": "rating", "Type": "bigint" }, { "Name": "review", "Type": "string" }, { "Name": "book", "Type": "string" }, { "Name": "author", "Type": "string" }, { "Name": "user", "Type": "string" } ], "Location": "s3://dynamodb-exports-000000000000-ap-northeast-1/csv/Reviews/", "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", "Compressed": false, "NumberOfBuckets": -1, "SerdeInfo": { "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "Parameters": { "field.delim": "," } }, "BucketColumns": [], "SortColumns": [], "Parameters": { "CrawlerSchemaDeserializerVersion": "1.0", "CrawlerSchemaSerializerVersion": "1.0", "UPDATED_BY_CRAWLER": "ReviewscsvCrawler", "areColumnsQuoted": "false", "averageRecordSize": "73", "classification": "csv", "columnsOrdered": "true", "compressionType": "none", "delimiter": ",", "objectCount": "20", "recordCount": "4", "sizeKey": "297", "skip.header.line.count": "1", "typeOfData": "file" }, "StoredAsSubDirectories": false }, "PartitionKeys": [ { "Name": "snapshot_timestamp", "Type": "string" } ], "TableType": "EXTERNAL_TABLE", "Parameters": { "CrawlerSchemaDeserializerVersion": "1.0", "CrawlerSchemaSerializerVersion": "1.0", "UPDATED_BY_CRAWLER": "ReviewscsvCrawler", "areColumnsQuoted": "false", "averageRecordSize": "73", "classification": "csv", "columnsOrdered": "true", "compressionType": "none", "delimiter": ",", "objectCount": "20", "recordCount": "4", "sizeKey": "297", "skip.header.line.count": "1", "typeOfData": "file" }, "CreatedBy": "arn:aws:sts::000000000000:assumed-role/AWSGlueServiceRoleDefault/AWS-Crawler", "IsRegisteredWithLakeFormation": false, "CatalogId": "000000000000" } }
これらのテーブルに対して、 Athena の画面からクエリを実行できます。
出力先の S3 バケットにはこのようにオブジェクトが生成されています。
% aws s3 ls dynamodb-exports-000000000000-ap-northeast-1 --recursive 2020-09-11 22:41:25 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00000 2020-09-11 22:41:25 149 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00001 2020-09-11 22:41:25 148 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00002 2020-09-11 22:41:25 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00003 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00004 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00005 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00006 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00007 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00008 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00009 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00010 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00011 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00012 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00013 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00014 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00015 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00016 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00017 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00018 2020-09-11 22:41:26 0 csv/Reviews/snapshot_timestamp=2020-09-11T13:41/run-1599831677824-part-r-00019
ファイルサイズが 0 でないものを 2つ確認すると、それぞれ以下のような中身になっています。DynamoDB テーブルに登録したアイテムが、 csv 形式でエクスポートされています。
Rating,Review,Book,Author,User 5,"A thrilling journey through the world of Hogwarts","Harry Potter and the Philosopher's Stone",J.K.Rowling,Tristan
Rating,Review,Book,Author,User 4,"Harry is pretty brave, but Hermione is the clear hero","Harry Potter and the Sorcerer's Stone",J.K.Rowling,Adeline
これで DynamoDB から S3 へのエクスポートを確認できました。Event ルールの定期実行を有効化すれば、自動で継続的なエクスポートが実現できます。
終わりに
AWS Glue と Step Functions を利用した、DynamoDB テーブルの S3 バケットへのエクスポートを確認しました。
AWS Glue のことがよく分かっていなくても、用意されたテンプレートに従って実行するだけで仕組みを実装することができました。必要に応じてカスタマイズして使用すれば、様々な要件に対応できそうですね。
以上、千葉(幸)がお送りしました。